package com.tplhk.rocketmq;

import com.alibaba.fastjson.JSON;
import com.tplhk.interfaces.ExceptionProcessor;
import com.tplhk.interfaces.OrderProcessor;
import com.tplhk.interfaces.TagProcessor;
import com.tplhk.interfaces.TextProcessor;
import com.tplhk.vo.Args;
import com.tplhk.vo.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.annotation.StreamRetryTemplate;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;

/**
 * Created by xw on 2017/2/20.
 * 2017-02-20 16:51
 */
@Slf4j
@SpringBootApplication
@EnableBinding({TextProcessor.class, OrderProcessor.class, ExceptionProcessor.class, TagProcessor.class})
public class Application implements CommandLineRunner {

    @Autowired
    TextProcessor textProcessor;

    @Autowired
    OrderProcessor orderProcessor;

    @Autowired
    ExceptionProcessor exceptionProcessor;

    @Autowired
    TagProcessor tagProcessor;

    public static void main(String[] args) {
        System.setProperty("spring.profiles.active", "rocketmq");
        System.setProperty("server.port", "9000");
        SpringApplication.run(Application.class);
    }

    @Override
    public void run(String... strings) {
//        System.out.println("普通对象消息发送");
//        Order appleOrder = new Order();
//        appleOrder.setOrderNum("0000001");
//        appleOrder.setNum(10);
//        appleOrder.setType("对象信息发送");
//        appleOrder.setCreateAt(new Date());
//        textProcessor.textOutput().send(MessageBuilder.withPayload(appleOrder).build());
//
//        System.out.println("延迟消息 10 秒后发送");
//        Order appleOrderDelay = new Order();
//        appleOrderDelay.setOrderNum("0000002");
//        appleOrderDelay.setNum(10);
//        appleOrderDelay.setType("60秒后再发送");
//        appleOrderDelay.setCreateAt(new Date());
//        textProcessor.textOutput().send(
//                MessageBuilder.withPayload(appleOrderDelay)
//                        .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 5)
//                        .build()
//        );
//
        System.out.println("发送 tag 消息");
        for (String tag : new String[]{"aaa", "bbb", "ccc"}) {
            // 创建 Message
            Order order = new Order();
            order.setOrderId(new Random().nextLong());
            log.info("orderId = {}, tag = {}", order.getOrderId(), tag);
            // 创建 Spring Message 对象
            Message<Order> springMessage = MessageBuilder.withPayload(order)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置 Tag
                    .setHeader(MessageConst.PROPERTY_KEYS, tag) // 设置 key，方便在控制台查找。
                    .build();
            // 发送消息
            tagProcessor.tagOutput().send(springMessage);
        }

//
//        System.out.println("事务消息测试");
//        Args args = Args.builder().args1(1).args2("这是一个 header 参数").build();
//        textProcessor.textOutput().send(
//                MessageBuilder.withPayload("由于没有 order 对象，所以在本地事务解析 payload 时一定会抛异常！")
//                        .setHeader(RocketMQHeaders.TRANSACTION_ID, "TX12345678")
//                        .setHeader("args", JSON.toJSONString(args))
//                        .build());

//        System.out.println("顺序消息发送");
//        this.buildOrders().stream().forEach(item -> {
//            log.info(item.toString());
//            Message<Order> springMessage = MessageBuilder.withPayload(item)
//                    .setHeader(MessageConst.PROPERTY_KEYS, item.getOrderId()+"="+item.getType())
//                    .build();
//            orderProcessor.outputOrder().send(springMessage);
//        });

//        System.out.println("异常消息发送");
//        Order exOrder = new Order();
//        exOrder.setOrderNum("0000004");
//        exOrder.setNum(777777);
//        exOrder.setType("处理异常消息");
//        exOrder.setCreateAt(new Date());
//        exceptionProcessor.exceptionOutput().send(
//                MessageBuilder.withPayload(exOrder)
//                    // 方便在控制台查找
//                    .setHeader(MessageConst.PROPERTY_KEYS, UUID.randomUUID().toString())
//                    .build()
//        );

    }

    @StreamListener(TextProcessor.TEXT_INPUT)
    public void input(Message<Order> message) {
        log.info("一般监听收到：" + message.getPayload().toString());
    }

    @StreamListener(OrderProcessor.ORDER_INPUT)
    public void inputOrder(Message<String> message) {
        log.info("[顺序消息][线程编号:{} 消息内容：{}]", Thread.currentThread().getId(), message);
    }

    // 过滤消息： 只接收 consumer.tag 配置中 aaa,bbb 的消息
//    @StreamListener(TagProcessor.TAG_INPUT)
//   过滤消息： 只接收 aaa 的消息
    @StreamListener(value = TagProcessor.TAG_INPUT, condition = "headers['rocketmq_TAGS'] == 'aaa'")
    public void tagOrder_aaa(Message<Order> message) {
        log.info("[过滤消息 aaa][线程编号:{} 消息内容：{}]", Thread.currentThread().getId(), message.getPayload().getOrderId());
    }

    //   过滤消息： 只接收 bbb 的消息
    @StreamListener(value = TagProcessor.TAG_INPUT, condition = "headers['rocketmq_TAGS'] == 'bbb'")
    public void tagOrder_bbb(Message<Order> message) {
        log.info("[过滤消息 bbb][线程编号:{} 消息内容：{}]", Thread.currentThread().getId(), message.getPayload().getOrderId());
    }


    @StreamListener(ExceptionProcessor.EXCEPTION_INPUT)
    public void exceptionInput(Message<Order> message) {
        log.info("[exceptionInput][线程编号:{} 消息内容：{}]", Thread.currentThread().getId(), message);
//        throw new RuntimeException("我故意抛一个异常");
    }

//    @ServiceActivator(inputChannel = "exception-topic.exception-consumer-group-exception-topic.errors")
//    public void handleError(ErrorMessage errorMessage) {
//        log.error("[handleError][payload：{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
//        log.error("[handleError][originalMessage：{}]", errorMessage.getOriginalMessage());
//        log.error("[handleError][headers：{}]", errorMessage.getHeaders());
//    }
//
//    @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
//    public void globalHandleError(ErrorMessage errorMessage) {
//        log.error("[globalHandleError][payload：{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
//        log.error("[globalHandleError][originalMessage：{}]", errorMessage.getOriginalMessage());
//        log.error("[globalHandleError][headers：{}]", errorMessage.getHeaders());
//    }


    /**
     * 生成模拟订单数据
     */
    private List<Order> buildOrders() {
        List<Order> orderList = new ArrayList<Order>();

        Order orderDemo = new Order();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setType("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setType("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setType("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setType("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setType("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setType("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("完成");
        orderList.add(orderDemo);

        return orderList;
    }

}

